﻿/****************************************************************
*   作者：Morain
*   创建时间：2018/2/19 19:39:26
*   描述说明：
*****************************************************************/
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace Model
{
  public class TChannel : AChannel
  {
    private readonly TcpClient tcpClient;

    private readonly RingBuffer recvBuffer = new RingBuffer();
    private readonly RingBuffer sendBuffer = new RingBuffer();

    private bool isSending;
    private readonly PacketParser parser;
    private bool isConnected;
    private TaskCompletionSource<Packet> recvTcs;

    /// <summary>
    /// connect
    /// </summary>
    /// <param name="tcpClient"></param>
    /// <param name="iPEndPoint"></param>
    /// <param name="service"></param>
    public TChannel(TcpClient tcpClient, IPEndPoint iPEndPoint, AService service) : base(service, ChannelType.Connect)
    {
      this.tcpClient = tcpClient;
      this.parser = new PacketParser(this.recvBuffer);
      this.RemoteAddress = iPEndPoint;

      this.ConnectAsync(iPEndPoint);
    }

    /// <summary>
    /// accept
    /// </summary>
    /// <param name="tcpClient"></param>
    /// <param name="service"></param>
    public TChannel(TcpClient tcpClient, AService service) : base(service, ChannelType.Accept)
    {
      this.tcpClient = tcpClient;
      this.parser = new PacketParser(this.recvBuffer);

      IPEndPoint iPEndPoint = (IPEndPoint)this.tcpClient.Client.RemoteEndPoint;
      this.RemoteAddress = iPEndPoint;

      this.OnAccepted();
    }

    private async void ConnectAsync(IPEndPoint iPEndPoint)
    {
      try
      {
        await this.tcpClient.ConnectAsync(iPEndPoint.Address, iPEndPoint.Port);
        this.isConnected = true;
        this.StartSend();
        this.StartRecv();
      }
      catch (Exception e)
      {
        Log.Error($"connect error:{e.Message}");
      }
    }

    private void OnAccepted()
    {
      this.isConnected = true;
      this.StartSend();
      this.StartRecv();
    }

    /// <summary>
    /// 接收数据
    /// </summary>
    /// <returns></returns>
    public override Task<Packet> Recv()
    {
      if (this.Id == 0)
      {
        throw new Exception("TChannel已经被Dispose, 不能接收消息");
      }

      bool isOK = this.parser.Parse();
      if (isOK)
      {
        Packet packet = this.parser.GetPacket();
        return Task.FromResult(packet);
      }

      recvTcs = new TaskCompletionSource<Packet>();
      return recvTcs.Task;
    }

    /// <summary>
    /// 开始接收
    /// </summary>
    private async void StartRecv()
    {
      try
      {
        while (true)
        {
          if (this.Id == 0) return;

          int size = this.recvBuffer.ChunkSize - this.recvBuffer.LastIndex;

          int n = await this.tcpClient.GetStream().ReadAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex, size);

          if (n == 0)
          {
            this.OnError(this, SocketError.SocketError);
            return;
          }

          this.recvBuffer.LastIndex += n;

          if (this.recvBuffer.LastIndex == this.recvBuffer.ChunkSize)
          {
            this.recvBuffer.AddLast();
            this.recvBuffer.LastIndex = 0;
          }

          if (this.recvTcs != null)
          {
            bool isOK = this.parser.Parse();
            if (isOK)
            {
              Packet packet = this.parser.GetPacket();
              var tcs = this.recvTcs;
              this.recvTcs = null;
              tcs.SetResult(packet);
            }
          }

        }
      }
      catch (Exception e)
      {
        Log.Error(e.ToString());
        this.OnError(this, SocketError.SocketError);
      }
    }

    public override void Send(byte[] buffer)
    {
      if (this.Id == 0)
        throw new Exception("TChannel已经被Dispose，不能发送消息。");

      byte[] size = BitConverter.GetBytes((ushort)buffer.Length);
      this.sendBuffer.SendTo(size);
      this.sendBuffer.SendTo(buffer);
      if (this.isConnected)
      {
        // 开始发送
        StartSend();
      }
    }

    public override void Send(List<byte[]> buffers)
    {
      if (this.Id == 0)
        throw new Exception("TChannel已经被Dispose，不能发送消息。");
      ushort size = 0;
      foreach (byte[] item in buffers)
      {
        size += (ushort)item.Length;
      }

      byte[] sizeBuffer = BitConverter.GetBytes(size);
      this.sendBuffer.SendTo(sizeBuffer);
      foreach (var item in buffers)
      {
        this.sendBuffer.SendTo(item);
      }
      if (this.isConnected)
      {
        this.StartSend();
      }
    }

    /// <summary>
    /// 开始发送缓冲中的消息
    /// </summary>
    private async void StartSend()
    {
      try
      {
        if (this.Id == 0) return;
        // 正在发送不需要发送
        if (this.isSending) return;

        while (true)
        {
          if (this.Id == 0) return;

          if (this.sendBuffer.Count == 0)
          {
            this.isSending = false;
            return;
          }

          this.isSending = true;

          int sendSize = this.sendBuffer.ChunkSize - this.sendBuffer.FirstIndex;
          if (sendSize > this.sendBuffer.Count)
          {
            sendSize = this.sendBuffer.Count;
          }

          await this.tcpClient.GetStream().WriteAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
          this.sendBuffer.FirstIndex += sendSize;
          if (this.sendBuffer.FirstIndex == this.sendBuffer.ChunkSize)
          {
            this.sendBuffer.FirstIndex = 0;
            this.sendBuffer.RemoveFirst();
          }
        }
      }
      catch (Exception e)
      {
        Log.Error(e.ToString());
        this.OnError(this, SocketError.SocketError);
      }
    }

    /// <summary>
    /// 释放资源
    /// </summary>
    public override void Dispose()
    {
      if (Id == 0) return;
      base.Dispose();
      this.tcpClient.Close();
    }
  }
}
